This is the content of my super blog post. A coroutine is a kind of function that can suspend and resume its execution at various pre-defined locations in its code. Subroutines are a special case of coroutines that have just a single entry point and complete their execution by returning to their caller. Python's coroutines (both the existing generator-based and the newly proposed variety) are not fully general, either, since they can only transfer control back to their caller when suspending their execution, as opposed to switching to some other coroutine as they can in the general case. When coupled with an event loop, coroutines can be used to do asynchronous processing, I/O in particular.
Python's current coroutine support is based on the enhanced generators from PEP 342, which was adopted into Python 2.5. That PEP changed the yield statement to be an expression, added several new methods for generators (send()
, throw()
, and close()
), and ensured that close()
would be called when generators get garbage-collected. That functionality was further enhanced in Python 3.3 with PEP 380, which added the yield from
expression to allow a generator to delegate some of its functionality to another generator (i.e. a sub-generator).
At first croutine is based on generator in Python. A generator is a function that produces a sequence of results instead of a single value. When a generator function is called, it returns a generator object without even beginning execution of the function. When next method is called for the first time, the function starts executing until it reaches yield statement. The yielded value is returned by the next call.
def yrange(n):
i = 0
x = 1
while i < n:
yield i+x
#return i and pause here waiting for instruction from caller.
i += 1
if __name__ = "__main__":
g = yrange(6)
print(next(g))
Generator will return a value and pause there, all related context is the same when it is resumed again by next()
method. Though at first generator is used to create memory-friendly iterator. But its characteristic is very suitable to realize a croutine.
Caller function can use next()
as communication pipe to notice generator to run and generator will pause at yield and return value to caller.
But what if caller want to pass an input as parameter for generator to use during its run. E.g. generator provides HTML parsing function, while caller loads the content of a website and pass the content to generator to parse.
By using global variable, it is possible to do this. A modified version of code above:
def yrange(n):
i = 0
global x
while i < n:
yield i+x
#return i and pause here waiting for instruction from caller.
i += 1
if __name__ = "__main__":
x = 1
g = yrange(6)
print(next(g))
x = 2
print(next(g))
This solution is quite ugly because usually we want to avoid use of global variable. The safest way for programming is to pass value to function as parameter and return value to caller to avoid use state machine as a control mechanism.
Since Python 2.5, yield
is changed as expression and behaves like a bidirectional communication tool. Generator has send()
, next()
, throw()
, close()
methods.
PEP342 gives a very detailed explanation about this:
Specification Summary By adding a few simple methods to the generator-iterator type, and with two minor syntax adjustments, Python developers will be able to use generator functions to implement co-routines and other forms of co-operative multitasking. These methods and adjustments are:
Redefine
yield
to be an expression, rather than a statement. The currentyield
statement would become ayield
expression whose value is thrown away. A yield expression's value is None whenever the generator is resumed by a normalnext()
call.Add a new
send()
method for generator-iterators, which resumes the generator and "sends" a value that becomes the result of the current yield-expression. Thesend()
method returns the next value yielded by the generator, or raises StopIteration if the generator exits without yielding another value.Add a new
throw()
method for generator-iterators, which raises an exception at the point where the generator was paused, and which returns the next value yielded by the generator, raising StopIteration if the generator exits without yielding another value. (If the generator does not catch the passed-in exception, or raises a different exception, then that exception propagates to the caller.)Add a
close()
method for generator-iterators, which raisesGeneratorExit
at the point where the generator was paused. If the generator then raisesStopIteration
(by exiting normally, or due to already beingclosed)
orGeneratorExit
(by not catching the exception),close()
returns to its caller. If the generator yields a value, aRuntimeError
is raised. If the generator raises any other exception, it is propagated to the caller.close()
does nothing if the generator has already exited due to an exception or normal exit.Add support to ensure that
close()
is called when a generator iterator is garbage-collected.Allow
yield
to be used intry/finally
blocks, since garbage collection or an explicitclose()
call would now allow the finally clause to execute.
yield
can not only pop a value to caller but also accept value from caller by send()
method.
yield
expression is evaluated as None if next()
is called. So basically next()
equals send(None)
.
Another example is as below:
def corout1(n):
for x in range(n):
yield x
def jumping_range(up_to):
"""Generator for the sequence of integers from 0 to up_to, exclusive.
Sending a value into the generator will shift the sequence by that amount.
"""
index = 0
global jump
while index < up_to:
jump = yield index
#print('step is {0}'.format(jump))
if jump is None:
jump = 1
index += jump
if __name__ == '__main__':
jump = None
iterator = jumping_range(5)
print(next(iterator)) # 0
print('step is {0}'.format(jump))
print(iterator.send(2)) # 2
print('step is {0}'.format(jump))
print(next(iterator)) # 3
print('step is {0}'.format(jump))
print(iterator.send(-1)) # 2
print('step is {0}'.format(jump))
for x in iterator:
print(x) # 3, 4
gen1 = corout1(6)
print(next(gen1))
print(gen1.send('sldf')) #you can send any value
Here I use global variable jump as a probe to detect the inner status of the generator. Output:
In [1]: run test2.py
0
step is None
2
step is 2
3
step is 1
2
step is -1
3
4
0
1
Here jump = yield index
is assignment of yield expression value to jump. If next()
is called, yield index will be evaluated as None
and generator will continue to run until it hits yield
expression again. When the generator is paused, assignment won't be conducted, the assignment of jump is the first step to be run for next iteration.
Now since next()
equals send(None)
, you can even avoid use of next()
to keep consistence. Now yield
expression behaviors like a transceiver, it first accept value from caller through send()
method and when yield expression is hit again, it will throw the value in yield
expression to its caller.
This should works fine as long as you fully understand the mechanism of yield
in generator.
In PEP380, yield from
is introduced to simplify pipeline of coroutines.
coroutine is not limited to communication only with its caller. It can also send data to another coroutine based input from its caller. This forms pipeline of coroutines.
def writer():
"""A coroutine that writes data *sent* to it to fd, socket, etc."""
z = 'a'
while True:
w = (yield z)
z = chr(97+w)
print('>> ', w)
def filter(coro):
coro.send(None) # prime the coro
i = 'hello'
while True:
try:
x = (yield i) # Capture the value that's sent
i = coro.send(x) # and pass it to the writer
except StopIteration:
pass
if __name__=="__main__":
g = writer()
f = filter(g)
print(f.send(None)) #prime f
print(f.send(7))
Output:
In [16]: run test3.py
hello
>> 7
h
In order to capture exception, send data to coroutine and get result from coroutine, there are quite a bit of codes to handle.
yield from
will take take of exception handling, communication between coroutines, which simplify the code a lot.
Just like yield
, yield from
is bidirectional operation. A revised version using yield from
is like below:
def writer():
"""A coroutine that writes data *sent* to it to fd, socket, etc."""
z = 'a'
while True:
w = (yield z)
z = chr(97+w)
print('>> ', w)
def filter(coro):
#coro.send(None) # prime the coro
i = 'hello'
i = yield from coro
if __name__=="__main__":
g = writer()
f = filter(g)
print(f.send(None)) #prime f
print(f.send(7))
Output:
In [18]: run test4.py
a
>> 7
h
As you can see, it is much easier to organize coroutine code with yield from
keyword.
generator based coroutine together with event loop bring ability for async programming.
It is possible to use return in generator.
This is a new feature in Python 3.3 (as a comment notes, it doesn't even work in 3.2). Much like return in a generator has long been equivalent to raise
StopIteration()
, return something in a generator is now equivalent to raiseStopIteration
(something). For that reason, the exception you're seeing should be printed asStopException
: 3, and the value is accessible through the attribute value on the exception object. If the generator is delegated to using the (also new)yield from
syntax, it is the result. See PEP 380 for details.
def f():
return 1
yield 2
def g():
x = yield from f()
print(x)
g() # prints 1
Generator based coroutine is kind of confusing in terms of grammar. It is quite hard to understanding the bidirectional communication mechanism of yield/yield from
without some deep introduction like this article. And generator is first introduced to bring one at a time iterator concept, then yield is revised to make generator suitable for coroutine concept.
async/await
keywords are introduced since Python 3.5 to make grammar of coroutine programming more meaningful. await
equals yield from
. So you can either use generator based coroutine or async/await
defined coroutine for async/concurrency environment.
Non-blocking program in Python
Async programming is often related with I/O bound or CPU bound tasks. I/O bound task means the code will wait for reading data from another process or thread for quite a time such as page content sent back from server or file content sent back from disk I/O program. CPU bound task often the code waits for results from another process that does heavy computing task.
So async programming naturally contains communications between different processes(thread is not efficient due to GIL). Current running coroutine will pause at where results are needed from another process. Then event loop will take control and send signal to another coroutine to run.
So the coroutine has to be non-blocking, which means it needs to check the status of outer process. If the results is ready, then it will run some code to process the results, otherwise it will yield again to give up the running privilege. The outer process has to be able to provide running status, so that the coroutine can be written in non-blocking style.
It is possible to mimic the async programming through subprocess module. The subprocess module can spawn a process from current process, which is executed independent from current process and can be used to mimic the process of loading content from a remote server or reading data from local disk.
server_subp.py simply sleep n seconde and print 'finished' as result. When client get the response from server,
import sys
import time
if __name__ == '__main__':
n = int(sys.argv[1])
time.sleep(n)
print('finished!')
client_subp.py
import subprocess
import time
def nb_read(t):
process = subprocess.Popen(['python3', 'server_subp.py', t], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
while process.poll() is None:
yield None
yield process.communicate()
task1 = nb_read('5')
task2 = nb_read('3')
fin1 = False
fin2 = False
while 1:
if not fin1:
res1 = task1.send(None)
if res1 != None:
print('task1:'+res1[0])
fin1 = True
task1.close()
else:
print('still working on task1')
if not fin2:
res2 = task2.send(None)
if res2 != None:
print('task2:'+res2[0])
fin2 = True
task2.close()
else:
print('still working on task2')
if fin1==True and fin2==True:
break
time.sleep(1)
#process.terminate()
nb_read()
is a non-blocking style coroutine to load data. It will spawn a process to read data. subprocess.poll()
is the method to poll status from sub process. It the return value is None, then nb_read()
will yield None to notify caller data loading is not finished yet and pauses there. Otherwise, nb_read()
will use subprocss.communicate()
to retrieve stdout content from sub process and terminate the process.
The while 1
loop is used to mimic event loop in asyncio library. It will create two generators of nb_read()
and query if they have finished loading data. If so, it will close the generator. The loop will continue to run unless all reading tasks are finished. The loop use send(None)
to inform the generator that you can run now.
That is the reason that why the generator based coroutine has to be decorated by @asyncio.coroutine
.
Output of the client_subp.py:
still working on task1
still working on task2
still working on task1
still working on task2
still working on task1
still working on task2
still working on task1
still working on task2
still working on task1
task2:finished!
still working on task1
task1:finished!
The above code demonstrates a simple async programming by polling state of coroutines. Async programming implies querying the status of the coroutine or waiting for coroutine to finish and execute callback function.
Callbacks are good
When a non-blocking API accepts a callback, this removes the responsibility of checking the result from the developer. Using a long car ride as an analogy, a callback is the equivalent of saying “I’m going to have a nap; when we arrive, somebody wake me up.” It is not important who does the waking up, provided that being woken up is expected. The sleeping person has every right to be upset if they are woken up early, or if they are woken in an inappropriate way.
However, it is likely that a callback will not arrive the instant that the operation completes, such as when the car driver unloads the bags before waking the sleeper. Also, the callback may not come at a suitable time for the sleeper (in the middle of a good dream, perhaps), which could also be problematic. (In practice, general-purpose operating systems do not allow this to occur.) There may also be limited guarantees as to where a callback is executed with respect to threads, program state and locks, restricting the amount of work that can safely be performed in a callback.
Callbacks are a well-defined contract between the application and the system managing asynchronous operations, easy for the developer to implement, but poorly defined with respect to timeliness and coherent program state.
Polling is also good
In many cases, non-blocking APIs prefer polling, making it the developer’s responsibility to check whether the operation has completed. Continuing the above analogy: “Are we there yet? Are we there yet? Are we there yet? Are we there yet?”
Assuming that the operating system does not tell the asker to stop asking (which does actually happen in some rate-limited scenarios), eventually the response will be provided. Because this is a reply and not an interruption, it can be guaranteed to appear at a suitable location within the code, outside of any locks that may interfere with it.
The downside of polling is that when the “Are we there yet?” asker has to take a deep breath, they are not going to find out whether they have arrived until they ask again. If they become distracted by some other task, they may forget or not have the opportunity to ask for a while. In enclosed systems it is possible to interleave a long operation with unrelated polling, but this is rarely composable in a way that suits a standard API.
Polling is also a well-defined contract, ensures program state is safe, allows the developer to prioritise asking versus doing other work, but is difficult to compose without full cooperation between all developers and libraries.**
In Python3.5, the asyncio library actually has a mixed style of polling and callback. The core classes of asyncio are BaseEventLoop, Task/Future, Handle.
BaseEventLoop is the base class of all EventLoop classes defined in base_events.py
. Event loop is the class which contains _ready and _scheduled queues. In _ready and _schedule, there are instances of Handle class. The major task of event loop instance is to run Handle._run()
in _ready or schedule Handle._run()
in _schedule based on time parameters.
_ready is an instance of collections.deque().
Most important methods are run_forever(self), run_until_complete(self, future), _run_once(self).
The call_soon()
method basically wraps callback function and arguments into a Handle object and append the Handle object to _ready queue.
call_at()
, call_later()
basically wrap callback function and arguments into a TimeHandle object and append the TimeHandle object to _scheduled queue.
_run_once()
is the core method of BaseEventLoop. It executes and schedules functions in _ready and _schedule queues. run_forever(self)
and run_until_complete(self, future)
are wrappers of _run_once()
.
Class Handle(func, arg) is a container for method and its arguments. Handle._run()
equivalents to func(arg). So when eventLoop._run_once() calls Handle._run(), it does not need to know the details of what function it is running.
Class Future() basically is a state machine contains state of whether or not it is finished. It implements iter() method so that it is callable by yield from. It uses done() method to check if the instance is finished. If yes it returns self.result(), else it yields self.
Future instance contains a _callbacks list which contains all callback functions that are supposed to be executed after it is finished.
Future class has set_result()
method to mark itself done and set the result, then it calls _schedule_callbacks()
which registers callback methods in the _callbacks list to event loop it belongs to through loop.call_soon()
.
Future class has add_done_callback(self, fn)
and remove_done_callback(self, fn)
to put callback functions into _callbacks list.
Future contains a ref to event_loop instance, so that it can schedule callback methods in the event_loop instance. But future instance is supposed to be initiated by event_loop instance.
Class task is a subclass of future. The difference is that its init() method wraps a coroutine or future object.
Task instance is created in a event_loop instance by create_task(). It requires a coroutine or future object as its parameter.
Task ·uses _step()· to wrap communication between it and croutine_or_future such as send()
, next()
and throw()
. So task._step()
behaves like a caller to coroutine so that there is no need to maintain the communication to coroutine_or_future yourself.
The init()
method of task register _step()
as callback function in event loop instance through call_soon()
. So loop._run_once()
will run task._step()
periodically.
example from official document
import asyncio
@asyncio.coroutine
def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
yield from asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
The purpose of aysncio.ensure_future() is to wrap the coroutine into a task. task instance will take the responsibility of communicating with coroutine and indicates its running status. Also task instance will register its _step() function in _ready queue inside the event loop instance. So eventloop._run_once() calls task._step(), while task._step() call the coroutine. eventloop does not need to know running status of a task. A task instance will unregister all its callback functions from the event loop when it is finished.
The asyncio.ensure_future(coro)
uses get_event_loop()
to find current event loop instance _loop and calls _loop.create_task(coro)
. The create_task(coro)
method calls task.init(coro)
which wraps coro into task._step()
method and register task._step()
method in _loop._ready queue through _loop.call_soon()
.
Then asyncio.run_until_complete(task)
is called. The run_until_complete(task)
method will call run_forever()
and query whether or not the task is finished by calling task.done()
. If done it returns task.result()
, otherwise it will throw an exception saying task is not completed yet.
The _loop.run_forever()
wraps the _loop._run_once()
method. _run_once()
will deal with _scheduled heap first. If timing is met, then the handle instance is moved into _ready heap. Then it will simply call handle._run()
for each handle instance in _ready.
Remember task._step()
method is also wraps into Handle instance. So coroutine will only be called in _loop._run_once() method.
Reference: What is new in Python 3.3 PEP380 In practice, what are the main uses for the new “yield from” syntax in Python 3.3? PEP342 How the heck does async/await work in Python 3.5? ASYNCHRONOUS API FOR PYTHON Understanding Non Blocking I/O with Python - Part 1